fix: preserve source id during sync imports#541
Conversation
There was a problem hiding this comment.
Pull request overview
This PR fixes multi-source sync/import so that sync --source <id> correctly writes and reconciles imported pages (and related chunks/tags/versions) under the requested sources.id instead of accidentally falling back to the default source.
Changes:
- Threads optional
sourceIdthrough sync → import → import-file paths. - Extends engine CRUD used by import (pages/chunks/tags/versions) to accept optional
sourceIdand scope DB lookups/writes. - Updates both Postgres and PGLite engine implementations to respect the source scope.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/core/types.ts | Adds optional source_id to PageInput to allow per-page source scoping. |
| src/core/engine.ts | Extends BrainEngine CRUD signatures to accept optional sourceId for source-scoped operations. |
| src/core/postgres-engine.ts | Implements source-scoped page/chunk/tag/version operations for Postgres. |
| src/core/pglite-engine.ts | Implements source-scoped page/chunk/tag/version operations for PGLite. |
| src/core/import-file.ts | Threads sourceId through import pipeline and passes it into engine operations during import. |
| src/commands/sync.ts | Passes opts.sourceId into import calls during sync, including full-sync import. |
| src/commands/import.ts | Passes optional sourceId through to importFile during directory import. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -131,8 +131,8 @@ export interface BrainEngine { | |||
| getEmbeddingsByChunkIds(ids: number[]): Promise<Map<number, Float32Array>>; | |||
|
|
|||
| // Chunks | |||
| upsertChunks(slug: string, chunks: ChunkInput[]): Promise<void>; | |||
| getChunks(slug: string): Promise<Chunk[]>; | |||
| upsertChunks(slug: string, chunks: ChunkInput[], sourceId?: string): Promise<void>; | |||
| getChunks(slug: string, sourceId?: string): Promise<Chunk[]>; | |||
There was a problem hiding this comment.
With getPage/getChunks/getTags/... now defaulting to source_id='default' when sourceId is omitted, the interface becomes inconsistent with listPages() / getAllSlugs() (which currently return entries across all sources). Callers that iterate via listPages() and then call getChunks(slug) / getTags(slug) (e.g. embed/export flows) will silently operate only on the default source for non-default pages. Consider adding source scoping to listPages/getAllSlugs (or including source_id in the returned Page shape) so multi-source callers can preserve source context end-to-end.
| export async function importFromContent( | ||
| engine: BrainEngine, | ||
| slug: string, | ||
| content: string, | ||
| opts: { noEmbed?: boolean } = {}, | ||
| opts: { noEmbed?: boolean; sourceId?: string } = {}, | ||
| ): Promise<ImportResult> { | ||
| // Reject oversized payloads before any parsing, chunking, or embedding happens. | ||
| // Uses Buffer.byteLength to count UTF-8 bytes the same way disk size would, | ||
| // so the network path behaves identically to the file path. | ||
| const byteLength = Buffer.byteLength(content, 'utf-8'); | ||
| if (byteLength > MAX_FILE_SIZE) { | ||
| return { | ||
| slug, | ||
| status: 'skipped', | ||
| chunks: 0, | ||
| error: `Content too large (${byteLength} bytes, max ${MAX_FILE_SIZE}). Split the content into smaller files or remove large embedded assets.`, | ||
| }; | ||
| } | ||
|
|
||
| const parsed = parseMarkdown(content, slug + '.md'); | ||
|
|
||
| // Hash includes ALL fields for idempotency (not just compiled_truth + timeline) | ||
| const hash = createHash('sha256') | ||
| .update(JSON.stringify({ | ||
| title: parsed.title, | ||
| type: parsed.type, | ||
| compiled_truth: parsed.compiled_truth, | ||
| timeline: parsed.timeline, | ||
| frontmatter: parsed.frontmatter, | ||
| tags: parsed.tags.sort(), | ||
| })) | ||
| .digest('hex'); | ||
|
|
||
| const parsedPage: ParsedPage = { | ||
| type: parsed.type, | ||
| title: parsed.title, | ||
| compiled_truth: parsed.compiled_truth, | ||
| timeline: parsed.timeline || '', | ||
| frontmatter: parsed.frontmatter, | ||
| tags: parsed.tags, | ||
| }; | ||
|
|
||
| const existing = await engine.getPage(slug); | ||
| const existing = await engine.getPage(slug, opts.sourceId); | ||
| if (existing?.content_hash === hash) { | ||
| return { slug, status: 'skipped', chunks: 0, parsedPage }; | ||
| } | ||
|
|
||
| // Chunk compiled_truth and timeline | ||
| const chunks: ChunkInput[] = []; | ||
| if (parsed.compiled_truth.trim()) { | ||
| for (const c of chunkText(parsed.compiled_truth)) { | ||
| chunks.push({ chunk_index: chunks.length, chunk_text: c.text, chunk_source: 'compiled_truth' }); | ||
| } | ||
| } | ||
| if (parsed.timeline?.trim()) { | ||
| for (const c of chunkText(parsed.timeline)) { | ||
| chunks.push({ chunk_index: chunks.length, chunk_text: c.text, chunk_source: 'timeline' }); | ||
| } | ||
| } | ||
|
|
||
| // v0.20.0 Cathedral II Layer 8 D2 — extract fenced code blocks from | ||
| // compiled_truth as first-class code chunks. | ||
| if (parsed.compiled_truth.trim()) { | ||
| const fenceChunks = await extractFencedChunks(parsed.compiled_truth, chunks.length); | ||
| chunks.push(...fenceChunks); | ||
| } | ||
|
|
||
| // Embed BEFORE the transaction (external API call). | ||
| // v0.14+ (Codex C2): embedding failure PROPAGATES. Silent drop accumulates | ||
| // unembedded pages invisibly. Caller can pass opts.noEmbed=true to skip. | ||
| if (!opts.noEmbed && chunks.length > 0) { | ||
| const embeddings = await embedBatch(chunks.map(c => c.chunk_text)); | ||
| for (let i = 0; i < chunks.length; i++) { | ||
| chunks[i].embedding = embeddings[i]; | ||
| chunks[i].token_count = Math.ceil(chunks[i].chunk_text.length / 4); | ||
| } | ||
| } | ||
|
|
||
| // Transaction wraps all DB writes | ||
| await engine.transaction(async (tx) => { | ||
| if (existing) await tx.createVersion(slug); | ||
| if (existing) await tx.createVersion(slug, opts.sourceId); | ||
|
|
||
| await tx.putPage(slug, { | ||
| type: parsed.type, | ||
| title: parsed.title, | ||
| compiled_truth: parsed.compiled_truth, | ||
| timeline: parsed.timeline || '', | ||
| frontmatter: parsed.frontmatter, | ||
| content_hash: hash, | ||
| source_id: opts.sourceId, | ||
| }); | ||
|
|
||
| // Tag reconciliation: remove stale, add current | ||
| const existingTags = await tx.getTags(slug); | ||
| const existingTags = await tx.getTags(slug, opts.sourceId); | ||
| const newTags = new Set(parsed.tags); | ||
| for (const old of existingTags) { | ||
| if (!newTags.has(old)) await tx.removeTag(slug, old); | ||
| if (!newTags.has(old)) await tx.removeTag(slug, old, opts.sourceId); | ||
| } | ||
| for (const tag of parsed.tags) { | ||
| await tx.addTag(slug, tag); | ||
| await tx.addTag(slug, tag, opts.sourceId); | ||
| } | ||
|
|
||
| if (chunks.length > 0) { | ||
| await tx.upsertChunks(slug, chunks); | ||
| await tx.upsertChunks(slug, chunks, opts.sourceId); | ||
| } else { | ||
| // Content is empty — delete stale chunks so they don't ghost in search results | ||
| await tx.deleteChunks(slug); | ||
| await tx.deleteChunks(slug, opts.sourceId); | ||
| } |
There was a problem hiding this comment.
This change introduces source-scoped import behavior, but there doesn’t appear to be a regression test asserting that importing with sourceId writes/updates pages, tags, chunks, and versions under that source (and not under default). Given the existing test coverage around import/sync and multi-source behavior, adding a focused test here would prevent this bug from recurring (ideally covering both PGLite and Postgres paths if the test suite supports it).
| const sql = this.sql; | ||
| const hash = page.content_hash || contentHash(page); | ||
| const frontmatter = page.frontmatter || {}; | ||
| const sourceId = page.source_id || 'default'; |
There was a problem hiding this comment.
putPage() uses page.source_id || 'default', which will silently treat an empty-string source id as the default source. That can reintroduce the “writes land under default” class of bugs if an upstream caller accidentally passes '' (or any other falsy value) instead of a real source id. Prefer nullish-coalescing (??) and/or validate that source_id is a non-empty string so invalid source IDs fail fast rather than falling back to default.
| const sourceId = page.source_id || 'default'; | |
| const sourceId = page.source_id ?? 'default'; | |
| if (sourceId.trim().length === 0) { | |
| throw new Error('page.source_id must be a non-empty string'); | |
| } |
| slug = validateSlug(slug); | ||
| const hash = page.content_hash || contentHash(page); | ||
| const frontmatter = page.frontmatter || {}; | ||
| const sourceId = page.source_id || 'default'; |
There was a problem hiding this comment.
putPage() uses page.source_id || 'default', which will silently treat an empty-string source id as the default source. That can reintroduce the “writes land under default” class of bugs if an upstream caller accidentally passes '' (or any other falsy value) instead of a real source id. Prefer nullish-coalescing (??) and/or validate that source_id is a non-empty string so invalid source IDs fail fast rather than falling back to default.
| const sourceId = page.source_id || 'default'; | |
| const rawSourceId = page.source_id; | |
| if (rawSourceId !== undefined && rawSourceId !== null && (typeof rawSourceId !== 'string' || rawSourceId.trim() === '')) { | |
| throw new Error('page.source_id must be a non-empty string when provided'); | |
| } | |
| const sourceId = rawSourceId ?? 'default'; |
| FROM pages WHERE slug = $1 AND source_id = $2 | ||
| RETURNING *`, | ||
| sourceId === undefined ? [slug] : [slug, sourceId] | ||
| ); |
There was a problem hiding this comment.
createVersion() returns rows[0] without checking whether the INSERT…SELECT matched a page. When the slug (or slug+source) doesn’t exist, this will return undefined while the method is typed as Promise<PageVersion>, leading to downstream runtime errors. Mirror the PostgresEngine behavior here by throwing a clear error when rows.length === 0.
| ); | |
| ); | |
| if (rows.length === 0) { | |
| throw new Error( | |
| sourceId === undefined | |
| ? `Page not found for slug "${slug}" and source_id "default"` | |
| : `Page not found for slug "${slug}" and source_id "${sourceId}"` | |
| ); | |
| } |
Summary
Fixes named-source sync/import writing pages and related import state to
defaultinstead of the requestedsources.id.Root cause
sync --source <id>was only source-scoping sync bookkeeping. The import/write path did not receivesourceId, so page writes fell back to the schema/default source, and slug-based page/chunk/tag lookups were source-blind.Changes
sourceIdthrough sync/import/import-file paths.sourceIdstill targetsource_id='default'.Validation
bun run typecheckopenclaw-support-kb: 0 pagestoopenclaw-support-kb: 635 pages.--source openclaw-support-kb.Fixes #540.
Need help on this PR? Tag
@codesmithwith what you need.